# NameServer 源码分析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 1. NameServer 整体流程

RocketMQ 中,NameServer 是一个无状态、轻量级的路由服务组件,可以看作轻量级的注册中心, 主要负责:

  • 维护 Topic → Broker 的路由信息
  • 接收 Broker 注册 / 心跳
  • 为 Producer / Consumer 提供路由查询能力

从宏观上看,NameServer 的核心流程可以拆成 3 个阶段:启动 → 路由注册 → 路由剔除

# 1.1 NameServer 启动

NameServer 启动后主要完成:

  • 初始化内存路由表结构(RouteInfoManager 中的各类 Map)
  • 启动 Netty 网络服务端,监听指定端口(默认 9876)
  • 注册请求处理器,接受 Broker 注册、心跳以及 Producer / Consumer 的路由查询请求
  • 开启定时任务,定期扫描、剔除不活跃的 Broker

此时 NameServer 自身是无状态路由节点,所有路由数据都仅存储在内存中,不做持久化。

# 1.2 路由注册

Broker 启动后会向 所有 NameServer 周期性发送:

  • 注册(REGISTER_BROKER)请求
  • 心跳(HEART_BEAT)请求

上报的核心内容包括:

  • Broker 集群名称(clusterName
  • Broker 名称(brokerName)与主从角色(brokerId
  • Topic 配置(TopicConfig 列表)
  • Broker 地址(IP + Port)
  • FilterServer 列表(如果有)

NameServer 收到注册 / 心跳请求后会更新内存中的多张路由表,包括但不限于:

  • Topic → Queue 信息(topicQueueTable
  • BrokerName → BrokerData(brokerAddrTable
  • ClusterName → BrokerName 集合(clusterAddrTable
  • BrokerAddr → 存活信息(brokerLiveTable

Producer、Consumer 查询路由时,即是基于这些内存结构来获取可用 Broker 地址和队列信息。

# 1.3 路由剔除

NameServer 与每台 Broker 之间保持长连接,并通过定时任务完成 Broker 存活检查:

  • 10 秒 扫描一次 brokerLiveTable
  • 如果某个 Broker 超过 120 秒 未收到心跳,判定为下线 / 失效

当判定 Broker 失效时,NameServer 会:

  • 关闭与该 Broker 的 Netty Channel
  • brokerLiveTable 中移除该 Broker
  • topicQueueTable 中剔除该 Broker 下的所有队列
  • brokerAddrTableclusterAddrTable 等结构中移除其相关数据

之后 Producer / Consumer 下一次从 NameServer 拉取路由时,就不会再拿到该失效 Broker 的地址,从而实现集群的动态感知与高可用

# 2. NameServer 启动流程源码

NameServer 是一个独立的进程,核心入口在 NamesrvStartup,最终构建并启动 NamesrvController

NamesrvStartup.main()
  └─> main0()
       └─> createNamesrvController()
            └─> controller.initialize()
            └─> controller.start()
1
2
3
4
5

image.png

# 2.1 加载KV配置

NamesrvStartup#createNamesrvController 负责解析启动参数、加载配置文件并创建 NamesrvController

public static NamesrvController createNamesrvController(String[] args)
        throws IOException, JoranException {
    // 省略参数解析相关代码...

    // 1. 创建 NamesrvConfig(NameServer 自身配置)
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // 2. 创建 NettyServerConfig(网络层配置)
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 默认监听端口
    nettyServerConfig.setListenPort(9876);

    // 3. 解析 -c 参数:指定外部配置文件
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            Properties properties = new Properties();
            properties.load(in);

            // 将 Properties 中的配置映射到对象
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    // 4. 解析 -p 参数:打印当前配置并退出(调试用)
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    // 5. 启动参数覆盖配置文件(优先级更高)
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    // 6. 创建 NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    return controller;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

几点小结:

  • -c:指定配置文件路径,加载后映射到 NamesrvConfigNettyServerConfig
  • -p:打印当前最终配置(包括默认值 + 配置文件 + 命令行),然后退出,一般用于排查配置问题
  • 命令行参数最终会覆盖配置文件中的同名配置项

# 2.2 构建 NettyRemotingServer(接收路由 / 心跳)

// NamesrvController#initialize()
public boolean initialize() {
    // ... 其他初始化逻辑

    // 创建 NettyRemotingServer,负责网络收发
    this.remotingServer = new NettyRemotingServer(
            this.nettyServerConfig, this.brokerHousekeepingService);

    // 注册请求处理器等
    this.registerProcessor();

    // ... 其他初始化逻辑
    return true;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

NameServer 启动时会调用:

// NamesrvController#start()
public void start() throws Exception {
    // 启动 Netty 服务,正式对外提供路由/注册/心跳服务
    this.remotingServer.start();
}
1
2
3
4
5

# 2.3 定时任务剔除超时 Broker

NamesrvController#initialize() 中还会启动一个定时任务,用于扫描并移除不活跃的 Broker:

public boolean initialize() {
    // ...

    // 业务处理线程池(用于处理网络请求)
    this.remotingExecutor = Executors.newFixedThreadPool(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    // 每隔 10s 扫描一次 Broker 存活状态
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // ...
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

路由剔除核心逻辑在 RouteInfoManager#scanNotActiveBroker()

// Broker 连接超时时间(默认 2 分钟)
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it =
            this.brokerLiveTable.entrySet().iterator();

    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();

        // 若上次心跳时间 + 2min < 当前时间,则认为该 Broker 已失效
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            // 关闭网络连接
            RemotingUtil.closeChannel(next.getValue().getChannel());
            // 从 brokerLiveTable 中移除
            it.remove();
            log.warn("The broker channel expired, {} {}ms",
                     next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            // 进一步从路由表中剔除该 Broker(topicQueueTable、brokerAddrTable、clusterAddrTable 等)
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

时间点对照:

  • Broker:默认 每 30 秒 向所有 NameServer 发送一次心跳
  • NameServer:每 10 秒 扫描一次 brokerLiveTable
  • 超过 120 秒未收到心跳 → 判定 Broker 失效,进行路由剔除

这会导致一个设计上的“短暂不一致”:

  • NameServer 认为 Broker 仍然存活(尚未超过 120s)
  • 但 Broker 可能已经宕机
  • 此时 Producer / Consumer 仍可能拿到已下线 Broker 的地址

RocketMQ 的处理方式是:将故障规避策略放在 Producer / Consumer 端,例如:

  • 发送异常后重试其他队列 / Broker
  • 消费失败后重试 / 重新拉取

NameServer 因此可以做到非常轻量、无状态、易扩展,这也是它的设计理念:简单、快速,复杂性放在业务端去兜底。

路由信息的删除有两个触发点:

  1. 被动删除:定时扫描发现 Broker 超时(2min 无心跳)
  2. 主动删除:Broker 正常关闭时,会主动发送 UNREGISTER_BROKER 请求,通知 NameServer 移除路由

两者最终都会调用同一套“路由删除逻辑”,从多张路由表中删除与该 Broker 相关的全部信息。

# 3 NameServer设计亮点

# 3.1 读写锁:读多写少的高并发优化

RouteInfoManager 中使用了 ReentrantReadWriteLock 读写锁来保护路由表的并发访问:

public class RouteInfoManager {
    // 适用于“读多写少”场景的读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // topic 对应队列信息
    private final HashMap<String /* topic */, List<QueueData>> topicQueueTable;
    // broker 基础信息
    private final HashMap<String /* brokerName */, BrokerData> brokerAddrTable;
    // 集群名 -> BrokerName 集合
    private final HashMap<String /* clusterName */, Set<String /* brokerName */>> clusterAddrTable;
    // brokerAddr -> 存活信息(心跳)
    private final HashMap<String /* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // brokerAddr -> FilterServer 列表
    private final HashMap<String /* brokerAddr */, List<String> /* Filter Server */> filterServerTable;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

典型的读写场景:

  1. 生产者发送消息时:频繁读取路由信息

    public byte[] getAllTopicList() {
        TopicList topicList = new TopicList();
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                topicList.getTopicList().addAll(this.topicQueueTable.keySet());
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("getAllTopicList Exception", e);
        }
    
        return topicList.encode();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
  2. Broker 心跳 / 注册时:更新路由信息(写)

    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
    
        RegisterBrokerResult result = new RegisterBrokerResult();
    
        try {
            this.lock.writeLock().lockInterruptibly();
            try {
                // 维护 clusterAddrTable
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (brokerNames == null) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);
    
                boolean registerFirst = false;
                // 维护 brokerAddrTable
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (brokerData == null) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
    
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                // ... 省略 master/slave 处理逻辑
    
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
    
        return result;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44

总结一下:

  • synchronized / ReentrantLock排他锁,同一时刻只有一个线程可进入
  • ReentrantReadWriteLock 则将锁拆分为读锁 / 写锁:
    • 多个读线程可以并发读
    • 写线程需要独占,会阻塞读 / 写线程

NameServer 的路由表属于典型的“读远多于写”的场景:

  • Producer / Consumer 不断查询路由(读多)
  • Broker 心跳 / 注册才会触发写,频率相对较低(写少)

因此使用读写锁可以有效提升并发性能。

# 3.2 路由信息全部基于内存,不做持久化

NameServer 内部通过几张 HashMap 来管理所有路由信息:

  • topicQueueTable: Topic → 队列信息,用于 Producer 选择队列时做负载均衡
  • brokerAddrTable: BrokerName → BrokerData(含 clusterName、master/slave 地址等)
  • clusterAddrTable: ClusterName → BrokerName 集合
  • brokerLiveTable: BrokerAddr → BrokerLiveInfo(上次心跳时间、Netty Channel 等)
  • filterServerTable: BrokerAddr → FilterServer 列表(用于类模式消息过滤)

NameServer 不对这些信息做任何落盘操作:

  • 路由信息只存在于内存中
  • Broker 重启后会重新向所有 NameServer 注册
  • NameServer 挂了重启,也不会影响整个集群的正确性

持久化的责任完全交给 Broker,NameServer 只关注“当前可用路由”,因此实现非常轻量,吞吐也非常高。

# 3.3 NameServer 无状态化与多机房容灾

img

几个关键特性:

  • NameServer 之间不通信、不做数据同步
  • 每个 Broker 会向所有 NameServer 注册 / 发送心跳
  • Producer / Consumer 与集群中任意一台 NameServer 建立长连接即可

多机房部署示例:

  • 假设一个 RocketMQ 集群跨两个机房部署,每个机房都有若干 NameServer、Broker、客户端
  • 当两个机房之间网络链路中断时:
    • 各自机房内的 NameServer 仍然可用
    • NameServer 只会保留本机房仍有心跳的 Broker
    • 客户端在本机房内访问本机房 NameServer,只能拿到本机房内可达的 Broker 路由

这意味着:

  • 网络分区不会影响 NameServer 自身的可用性(反正它们不互相同步)
  • NameServer 只基于是否能收到心跳来判断 Broker 是否可用
  • 对于跨机房的 Broker,只要网络断了,就会被各自机房的 NameServer 剔除出路由信息

整体上,RocketMQ 通过 NameServer 的无状态 + 全内存设计,换来了:

  • 极高的路由读写性能
  • 非常简单的水平扩展与多机房容灾能力
  • 清晰的职责边界(路由中心 ≠ 元数据中心)